package com.atguigu.realtime.app

import com.alibaba.fastjson.JSON
import com.atguigu.gmall.common.Constant
import com.atguigu.realtime.bean.StartupLog
import com.atguigu.realtime.util.{MyKafkaUtil, MyRedisUtil}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

/**
 * Author atguigu
 * Date 2020/10/14 16:17
 */
object DauApp {
    def main(args: Array[String]): Unit = {
        // 通过流的方式去消费数据
        // 1. 创建一个 StreamingContext
        val conf = new SparkConf().setAppName("DauApp").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(3))
        // 2. 从kafka获取流
        val sourceStream: DStream[String] = MyKafkaUtil.getKafkaStream(ssc, "DauApp", Constant.STARTUP_TOPIC)
        // 3. 对流做各种转换操,  输出(print/foreachPartitions)
        //3.1 对数据做封装, 封装到样例类中
        val startupLogStream = sourceStream
            .map(jsonString => JSON.parseObject(jsonString, classOf[StartupLog]))
        
        /* // 3.2 保留第一次启动记录
         val filteredStartupLogStream = startupLogStream.filter(log => {
             //println("过滤前:" + log)
             // 1. 先获取redis的客户端
             val client: Jedis = MyRedisUtil.getClient
             // 2. 写入到set: 返回1表示第一次, 返回零不是第一次
             val r = client.sadd(s"dau:${log.logDate}", log.mid)
             // 3. 把返回值是1的log保留
             client.close()
             r == 1
         })*/
        
        /*val filteredStartupLogStream: DStream[StartupLog] = startupLogStream.mapPartitions(startupIt => {
            val client: Jedis = MyRedisUtil.getClient
            val result = startupIt.filter(log => {
                client.sadd(s"dau:${log.logDate}", log.mid) == 1
            })
            client.close()
            result
        })*/
        
        // 1. driver
        //System.identityHashCode()
        val filteredStartupLogStream = startupLogStream.transform(rdd => {
            // 2. driver
            rdd.mapPartitions(startupIt => {
                // 3. executor
                val client: Jedis = MyRedisUtil.getClient
                val result = startupIt.filter(log => {
                    client.sadd(s"dau:${log.logDate}", log.mid) == 1
                })
                client.close()
                result
            })
        })
        // 3.2 把每个设备每天的第一次启动记录写入到hbase中   输出: print foreachRDD
        filteredStartupLogStream.foreachRDD(rdd => {
            println("-------------------------------------------")
            println(s"Time: ${System.currentTimeMillis()}")
            println("-------------------------------------------")
            import org.apache.phoenix.spark._
            rdd.saveToPhoenix(
                "GMALL_DAU",
                Seq("MID", "UID", "APPID", "AREA", "OS", "CHANNEL", "LOGTYPE", "VERSION", "TS", "LOGDATE", "LOGHOUR"),
                zkUrl = Option("hadoop102,hadoop103,hadoop104:2181"))
        })
        // 3. 启动上下文
        ssc.start()
        // 5. 阻止主线程退出, 防止流关闭
        ssc.awaitTermination()
    }
}

/*
对流的数据去重, 一定要借助于外部存储: redis  set


org.apache.phoenix.jdbc.PhoenixDriver
 */